package com.atguigu.tingshu.user.service.impl;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSON;
import com.atguigu.tingshu.common.constant.KafkaConstant;
import com.atguigu.tingshu.common.constant.RedisConstant;
import com.atguigu.tingshu.common.constant.SystemConstant;
import com.atguigu.tingshu.common.service.KafkaService;
import com.atguigu.tingshu.common.util.MongoUtil;
import com.atguigu.tingshu.model.user.UserListenProcess;
import com.atguigu.tingshu.user.service.UserListenProcessService;
import com.atguigu.tingshu.vo.album.TrackStatMqVo;
import com.atguigu.tingshu.vo.user.UserListenProcessVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.math.BigDecimal;
import java.util.Date;
import java.util.concurrent.TimeUnit;

@Service
@SuppressWarnings({"all"})
public class UserListenProcessServiceImpl implements UserListenProcessService {

    @Autowired
    private MongoTemplate mongoTemplate;

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private KafkaService kafkaService;

    /**
     * 获取当前用户收听声音播放进度
     *
     * @param userId  用户ID
     * @param trackId 声音ID
     * @return
     */
    @Override
    public BigDecimal getTrackBreakSecond(Long userId, Long trackId) {
        //1.构建查询条件
        Query query = new Query();
        query.addCriteria(Criteria.where("userId").is(userId).and("trackId").is(trackId));
        //2.执行查询播放进度
        UserListenProcess userListenProcess = mongoTemplate.findOne(query, UserListenProcess.class, MongoUtil.getCollectionName(MongoUtil.MongoCollectionEnum.USER_LISTEN_PROCESS, userId));
        if (userListenProcess != null) {
            return userListenProcess.getBreakSecond();
        }
        return new BigDecimal("0.00");
    }

    /**
     * 更新当前用户收听声音播放进度
     *
     * @param userId              用户ID
     * @param userListenProcessVo 播放进度信息
     * @return
     */
    @Override
    public void updateListenProcess(Long userId, UserListenProcessVo userListenProcessVo) {
        //1.根据用户ID+声音ID获取播放进度
        Query query = new Query();
        //1.1 设置查询条件
        query.addCriteria(Criteria.where("userId").is(userId).and("trackId").is(userListenProcessVo.getTrackId()));
        //1.2 设置查询第一条记录(避免小程序暂停后恢复播放将积压更新进度请求并发发起，导致新增多条播放进度)
        query.limit(1);
        UserListenProcess userListenProcess = mongoTemplate.findOne(query, UserListenProcess.class, MongoUtil.getCollectionName(MongoUtil.MongoCollectionEnum.USER_LISTEN_PROCESS, userId));
        if (userListenProcess == null) {
            //2.如果播放进度不存在-新增播放进度
            userListenProcess = new UserListenProcess();
            userListenProcess.setUserId(userId);
            userListenProcess.setAlbumId(userListenProcessVo.getAlbumId());
            userListenProcess.setTrackId(userListenProcessVo.getTrackId());
            userListenProcess.setBreakSecond(userListenProcessVo.getBreakSecond());
            userListenProcess.setIsShow(1);
            userListenProcess.setCreateTime(new Date());
            userListenProcess.setUpdateTime(new Date());
        } else {
            //3.如果播放进度存在-更新进度
            userListenProcess.setBreakSecond(userListenProcessVo.getBreakSecond());
            userListenProcess.setUpdateTime(new Date());
        }
        mongoTemplate.save(userListenProcess, MongoUtil.getCollectionName(MongoUtil.MongoCollectionEnum.USER_LISTEN_PROCESS, userId));

        //4.采用Redis提供set k v nx ex 确保在规定时间内（24小时/当日内）播放进度统计更新1次
        String key = RedisConstant.USER_TRACK_REPEAT_STAT_PREFIX + userId + ":" + userListenProcessVo.getTrackId();
        long ttl = DateUtil.endOfDay(new Date()).getTime() - System.currentTimeMillis();
        Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, userListenProcess.getTrackId(), ttl, TimeUnit.MILLISECONDS);
        if (flag) {
            //5.如果是首次更新播放进度，发送消息到Kafka话题
            //5.1 构建更新声音播放进度MQVO对象
            TrackStatMqVo mqVo = new TrackStatMqVo();
            //生成业务唯一标识，消费者端（专辑服务、搜索服务）用来做幂等性处理，确保一个消息只能只被处理一次
            mqVo.setBusinessNo(IdUtil.fastSimpleUUID());
            mqVo.setAlbumId(userListenProcessVo.getAlbumId());
            mqVo.setTrackId(userListenProcessVo.getTrackId());
            mqVo.setStatType(SystemConstant.TRACK_STAT_PLAY);
            mqVo.setCount(1);
            //5.2 发送消息到更新声音统计话题中
            kafkaService.sendMessage(KafkaConstant.QUEUE_TRACK_STAT_UPDATE, JSON.toJSONString(mqVo));
        }
    }

}
